package com.avris.tool.consumer;


import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;


/**
 * Kafka 消费端连接
 * @Author jast
 * @Date 2020/4/19 下午2:22
 * @Version 1.0
 */
public class KafkaConsumerClient {


    private String KAFKA_BROKERS ;

    public KafkaConsumerClient(String kafkaBrokers){
        this.KAFKA_BROKERS = kafkaBrokers;
    }
    /**
     * 获取topic信息使用
     * @return Consumer<Long,String>
     * @return
     */
    public Consumer<Long, String> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        return consumer;
    }

    /**
     * 消费修改offset使用
     * @return Consumer<Long,String>
     * @param topic
     * @param group
     * @return
     */
    public Consumer<Long, String> createConsumer(String topic,String group) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put("group.id", group);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//自动提交 false
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);//设置最大消费数
        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }

    /**
     * 消费数据 使用
     * @return KafkaConsumer<String,String>
     * @param topic
     * @param max_poll_records
     * @param group
     * @return
     */
    public KafkaConsumer<String, String> createConsumer(String topic, String group , int max_poll_records ,boolean isLatest) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        //        props.put("zookeeper.session.timeout.ms", "4000");
        //        props.put("zookeeper.sync.time.ms", "200");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90000);//连接超时
		props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100000);//请求超时
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 90000);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 false
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Integer.MAX_VALUE);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Integer.MAX_VALUE);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        return consumer;
    }



}
